/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.core.handler;

import com.chinamobile.cmss.lakehouse.common.kubernetes.K8sModelConstant;
import com.chinamobile.cmss.lakehouse.core.client.IKubernetesClient;
import com.chinamobile.cmss.lakehouse.core.client.InformerClient;
import com.chinamobile.cmss.lakehouse.core.client.InformerException;
import com.chinamobile.cmss.lakehouse.core.client.impl.KubernetesClientImpl;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1ClusterRoleBinding;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1Deployment;
import io.kubernetes.client.openapi.models.V1Namespace;
import io.kubernetes.client.openapi.models.V1Node;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServicePort;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class K8sDeployHandler {

    @Autowired
    protected IKubernetesClient kubernetesClient;

    @Autowired
    private InformerClient informerClient;

    public boolean checkComponentHealthStatus(String namespace, String statefulSetName) {
        if (!checkNamespaceHealthStatus(namespace)) {
            log.info("namespace {} is not healthy", namespace);
            return false;
        }
        if (!checkStatefulSetHealthStatus(namespace, statefulSetName)) {
            log.info("statefulSet {} in namespace {} is not healthy", statefulSetName, namespace);
            return false;
        }
        return true;
    }

    public boolean checkDeploymentComponentHealthStatus(String namespace, String deploymentSetName) {
        if (!checkNamespaceHealthStatus(namespace)) {
            log.info("namespace {} is not healthy", namespace);
            return false;
        }
        if (!checkDeploymentHealthStatus(namespace, deploymentSetName)) {
            log.info("deployment {} in namespace {} is not healthy", deploymentSetName, namespace);
            return false;
        }
        return true;
    }

    private boolean checkNamespaceHealthStatus(String namespace) {
        V1Namespace v1Namespace = null;
        try {
            v1Namespace =
                kubernetesClient.listNamespace(K8sModelConstant.LABEL_SELECTOR).getItems().stream()
                    .collect(Collectors.toMap(ns -> ns.getMetadata().getName(), ns -> ns)).get(namespace);
        } catch (ApiException e) {
            log.error("Failed to get namespace {} status", namespace, e);
        }
        return v1Namespace != null && "Active".equals(v1Namespace.getStatus().getPhase());
    }

    private boolean checkStatefulSetHealthStatus(String namespace, String statefulSetName) {
        V1StatefulSet statefulSet =
            kubernetesClient.readNamespacedStatefulSet(namespace, statefulSetName).get();
        return statefulSet.getStatus().getReadyReplicas() >= 1;
    }

    private boolean checkDeploymentHealthStatus(String namespace, String deploymentSetName) {
        V1Deployment deployment =
            kubernetesClient.readNamespacedDeployment(namespace, deploymentSetName).get();
        return deployment.getStatus().getReadyReplicas() >= 1;
    }

    public boolean checkCommonResourceExisting(String namespace, String configMap, String svcName,
                                               String statefulSetName) {
        boolean existing = false;
        try {
            existing = kubernetesClient.existConfigMap(namespace, configMap)
                && kubernetesClient.existService(namespace, svcName)
                && kubernetesClient.existStatefulSet(namespace, statefulSetName);
        } catch (ApiException e) {
            log.error("Failed to check common resource existing in namespace {}", namespace, e);
        }
        return existing;
    }

    public String getExternalUrlFromService(String namespace, String serviceName, String portName) {
        String defaultUrl = "0.0.0.0:10085";
        return getWrappedUrlFromService(defaultUrl, K8sServiceHandler.PORT_SEPARATOR, namespace,
            serviceName, portName);
    }

    private String getWrappedUrlFromService(String defaultUrl, String separator, String namespace,
                                            String serviceName, String portName) {
        try {
            List<V1Service> svcList = kubernetesClient.listService(namespace).getItems().stream()
                .filter(s -> serviceName.equals(s.getMetadata().getName())).collect(Collectors.toList());
            if (svcList.isEmpty()) {
                log.error("service {} not found in {}.", serviceName, namespace);
                return defaultUrl;
            }
            V1Service service = svcList.get(0);
            List<V1ServicePort> portList = service.getSpec().getPorts().stream()
                .filter(sp -> portName.equals(sp.getName())).collect(Collectors.toList());
            if (portList.isEmpty()) {
                log.error("port {} not found in {}.", portName, serviceName);
                return defaultUrl;
            }
            List<V1Node> nodeList = kubernetesClient.listNode(null).getItems();
            return nodeList.get(0).getMetadata().getName() + separator + portList.get(0).getPort();
        } catch (ApiException e) {
            log.error("Failed to get external url from service {} in namespace {} via port name {}",
                serviceName, namespace, portName);
            return defaultUrl;
        }
    }

    public void createClusterRoleBinding(V1ClusterRoleBinding v1ClusterRoleBinding)
        throws InformerException {
        String crbName = v1ClusterRoleBinding.getMetadata().getName();
        try {
            // create clusterRoleBinding if not existed
            if (!kubernetesClient.existClusterRoleBinding(crbName)) {
                informerClient.createClusterRoleBinding(v1ClusterRoleBinding);
            }
        } catch (ApiException e) {
            log.error("Failed to check clusterRoleBinding {} existing", crbName, e);
            throw new InformerException(e);
        }
    }

    public void rolloutRestartStatefulSet(String statefulSetName, String namespace) throws ApiException {
        Optional<V1StatefulSet> statefulSetOptional = kubernetesClient.readNamespacedStatefulSet(namespace, statefulSetName);
        if (statefulSetOptional.isPresent()) {
            V1StatefulSet runningStatefulSet = statefulSetOptional.get();
            runningStatefulSet
                .getSpec()
                .getTemplate()
                .getMetadata()
                .putAnnotationsItem("kubectl.kubernetes.io/restartedAt", LocalDateTime.now().toString());
            String statefulSetJson = kubernetesClient.getApiClient().getJSON().serialize(runningStatefulSet);
            informerClient.patchStatefulSet(namespace, statefulSetJson, statefulSetName);
        }
    }

    public void deploy(String namespace, List<KubernetesObject> k8sObjs) throws InformerException {
        try {
            // create namespace if not existed
            if (!kubernetesClient.existNamespace(namespace)) {
                informerClient.createNamespace(KubernetesClientImpl.buildNamespace(namespace));
            }
        } catch (ApiException e) {
            log.error("Failed to check namespace {} existing", namespace, e);
            throw new InformerException(e);
        }

        // create components in namespace
        for (KubernetesObject k8sObj : k8sObjs) {

            Class<?> aClass = k8sObj.getClass();
            if (aClass.isAssignableFrom(V1ConfigMap.class)) {
                informerClient.createConfigMap(namespace, (V1ConfigMap) k8sObj);
            }

            if (aClass.isAssignableFrom(V1Service.class)) {
                informerClient.createService(namespace, (V1Service) k8sObj);
            }

            if (aClass.isAssignableFrom(V1StatefulSet.class)) {
                informerClient.createStatefulSet(namespace, (V1StatefulSet) k8sObj);
            }

            if (aClass.isAssignableFrom(V1Deployment.class)) {
                informerClient.createDeployment(namespace, (V1Deployment) k8sObj);
            }

        }
    }

}